package com.dahuan.transform;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

public class Transform_Connect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //从文件读取数据
        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        DataStreamSource<String> stringDataStreamSource = env.readTextFile( path );

        //输入类型是String 类型，返回什么类型就输出什么类型
        DataStream<SensorReading> dataStream = stringDataStreamSource.map( new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] split = value.split( "," );
                return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
            }
        } );

        //TODO  分流 , 按照温度值30为界分为两条流
        SplitStream<SensorReading> splitStream = dataStream.split( new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
                /**
                 * 看这个值的温度值是否大于30度,如果大于30 输出 high , 如果小于 30 输出low
                 *                                             返回一个只包含指定对象的不可变列表。
                 */
                return (sensorReading.getTemperature() > 30) ? Collections.singletonList( "high" ) : Collections.singletonList( "low" );
            }
        } );

        DataStream<SensorReading> highTempStream = splitStream.select( "high" );
        DataStream<SensorReading> lowTempStream = splitStream.select( "low" );
        DataStream<SensorReading> allTempStream = splitStream.select( "high", "low" );

        /*
        highTempStream.print("high");
        lowTempStream.print("low");
        allTempStream.print("all");*/


        //TODO 合流，connect , 将高温流转换成二元组类型 , 与低温流连接合并之后，输出状态信息
        SingleOutputStreamOperator<Tuple2<String, Double>> waringStream = highTempStream.map( new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                //获取id 和 温度值
                return new Tuple2<>(value.getId(),value.getTemperature());
            }
        } );

        //获取出来之后的高温流 用connect 方法联合低温流 使得低温流的变量与高温流相同
        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = waringStream.connect( lowTempStream );

        //CoMapFunction接口是将两个流在map上实现
        DataStream<Object> resultStream  = connectedStreams.map( new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {


            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
               return new Tuple3<>(value.f0,value.f1,"high temp warning");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(),"normal");
            }
        } );

        resultStream.print();



        //union 联合多条流,可以重复
        highTempStream.union( lowTempStream,allTempStream ).print("union");



        env.execute( "Transform_Connect" );


    }
}
